package com.k2data.server;

import com.k2data.common.Constants;
import com.k2data.common.EnvConf;
import com.k2data.producer.KafkaClient;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;

/**
 * Created by lijianing on 17-11-6.
 */
public class Server {
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    private List<ChannelHandler> handlers;
    private static KafkaClient[] kafkaClients ;
    private static int producerCount = Integer.parseInt(EnvConf.getEvnValue(Constants.KAFKA_PRODUCER_NUM,"5"));

    public Server(List<ChannelHandler> handlers) {
        this.handlers = handlers;
    }

    public void start() throws InterruptedException {
        int portNumber = 7878;
        try {
            portNumber = Integer.parseInt(EnvConf.getEvnValue(Constants.SERVER_PORT_NUMBER,"7878"));
        } catch (NumberFormatException e) {
            LOG.error("server port must be number." + e.getMessage());
            System.exit(-1);
        }
        String topicName = EnvConf.getEvnValue(Constants.TOPIC_NAME,"");
        String kafkaServerUrl = EnvConf.getEvnValue(Constants.KAFKA_SERVER_URL,"");

        LOG.info("env SERVER_PORT_NUMBER: " + portNumber);
        LOG.info("env TOPIC_NAME: " + topicName);
        LOG.info("env KAFKA_SERVER_URL: " + kafkaServerUrl);
        if(portNumber == 0 || topicName.trim().isEmpty() || kafkaServerUrl.trim().isEmpty()){
            LOG.error("env config containing the error entry");
            System.exit(-1);
        }
        kafkaClients = new KafkaClient[producerCount];
        for(int i =0; i<producerCount ; i++) {
            try {
                LOG.info("init kafka producer: "+(i+1));
                kafkaClients[i] = new KafkaClient(topicName, kafkaServerUrl);
            } catch (ExecutionException e) {
                LOG.error("kafka producer can't send message to server: " + kafkaServerUrl);
                e.printStackTrace();
                System.exit(1);
            }
        }

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup);
            b.channel(NioServerSocketChannel.class);
            if(handlers != null && handlers.size() > 0) {
                for(int i = 0; i < handlers.size(); i++) {
                    b.childHandler(handlers.get(i));
                }
            }
            ChannelFuture f = b.bind(portNumber).sync();
            LOG.info("netty server start.");
            f.channel().closeFuture().sync();
            LOG.info("netty server stop.");
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    public static KafkaClient getClient(){
        int n = new Random().nextInt(producerCount);
        return  kafkaClients[n];
    }
}
